AWS IoTを使ったIoTエッジソフト開発(Python on Raspberry Pi OS)でやって良かったこと

AWS IoTを使ったIoTエッジソフト開発(Python on Raspberry Pi OS)でやって良かったこと

Clock Icon2022.10.12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

タイトルの通りですが、Raspberry Pi OS上で動作するIoTエッジ開発でやって良かったことを紹介します。

本記事の注意点

  • 本記事ではハードウェアとの連携部分に関してはほぼありません。あくまでIoTデバイス側のソフトウェア実装(これを本記事ではエッジ側呼びます)を、クラウド(AWS IoT Core)と連携する上で気をつけて良かったことを紹介します
  • 本記事で紹介するプログラムは断片的です。詳細は適宜reterminal-sampleを参照してください。Mac, Raspberry Pi OS両方で動作します。利用方法は、記事の末尾の項.付録を参照してください
  • MQTT接続にはaws-crt-pythonを利用前提で記載しています。実現できなかったことに関しては、ラッパーであるaws-iot-device-sdk-python-v2でのみ可能かどうかを検証しています。他のライブラリに関しては未検証です
  • 各解決策はベストプラックティスではないため、参考程度でお願いします
  • 中にはエッジ開発じゃなくても当然でしょ、、という項目もありますが改めて確認という形で見て頂けると幸いです
  • 組み込み開発ではありません。Pythonを用いたサーバーサイドアプリ開発とほぼ同じです

環境

利用するOS、ツールチェインは以下の通りです。

# Raspberry Pi OSバージョン
$ lsb_release -a
No LSB modules are available.
Distributor ID: Raspbian
Description:    Raspbian GNU/Linux 10 (buster)
Release:        10
Codename:       buster

# Pythonバージョン
$ python3 --version
Python 3.10.4

# poetryバージョン
$ poetry --version
Poetry (version 1.2.1)

MQTT関連

サブスクライブトピックへパブリッシュ時に処理を行うコールバック関数はすぐに完了させ、重い処理は別スレッドで行う

事象/再現コード

事象を説明するため、以下のようなコードを示します。

ソースコード
import utils.config as Config
import utils.logger as Logger
from awscrt import io, mqtt
from threading import Thread
import json
import time

config = Config.getConfig()
Logger.init()

logger = Logger.getLogger(__name__)

AWS_IOT_ENDPOINT = config.IotEndpoint
# AWS IoTのサポートするkeep-alive 5 - 1200 s
KEEP_ALIVE_SECS = 5
# ms単位なので、KEEP_ALIVE_SECS * 1000 より短い必要がある
PING_TIMEOUT_MS = 1000
THING_NAME = "test-thing"

SUB_TOPIC_A = "sub/a"
SUB_TOPIC_B = "sub/b"


class PublishThread(Thread):
    def __init__(self, mqtt_connection=None, publish_topic_name=None):
        self.connection = mqtt_connection
        self.publish_topic_name = publish_topic_name
        Thread.__init__(self)

    def run(self):

        while True:
            timestamp = int(time.time())
            logger.info(f"publish {self.name} {timestamp}")
            self.connection.publish(
                topic=self.publish_topic_name,
                payload=json.dumps({f"{self.native_id}": timestamp}),
                qos=mqtt.QoS.AT_MOST_ONCE,
            )

            logger.info("publish done")

            time.sleep(1)


def createMQTTConnection(
    device_certificate_filepath: str,
    private_key_filepath: str,
    ca_certificate_filepath: str,
):
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(
        device_certificate_filepath, private_key_filepath
    )
    tls_ctx_options.override_default_trust_store_from_path(
        None, ca_certificate_filepath
    )
    tls_ctx = io.ClientTlsContext(tls_ctx_options)

    mqtt_client = mqtt.Client(client_bootstrap, tls_ctx)
    mqtt_connection = mqtt.Connection(
        client=mqtt_client,
        client_id=THING_NAME,
        host_name=AWS_IOT_ENDPOINT,
        port=8883,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE_SECS,
        ping_timeout_ms=PING_TIMEOUT_MS,
    )

    connect_future = mqtt_connection.connect()
    connect_future.result()

    return mqtt_connection


def sub_heavy_callback(topic, payload, dup, qos, retain):
    logger.info("call heavy func")
    logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}")
    logger.info(f"payload: {payload.decode('utf-8')}")
    time.sleep(10)
    logger.info("end heavy func")


def sub_light_callback(topic, payload, dup, qos, retain):
    logger.info("call light func")
    logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}")
    logger.info(f"payload: {payload.decode('utf-8')}")
    logger.info("end light func")


def main():
    logger.info("start main thread")
    logger.info("start create mqtt connection")
    connection = createMQTTConnection(
        device_certificate_filepath="./data/device_certificate.pem",
        private_key_filepath="./data/private.key",
        ca_certificate_filepath="./data/ca_certificate.pem",
    )
    logger.info("end create mqtt connection")

    subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A)
    subTopic1Publisher.daemon = True
    subTopic1Publisher.name = SUB_TOPIC_A
    subTopic1Publisher.start()

    subTopic2Publisher = PublishThread(connection, SUB_TOPIC_B)
    subTopic2Publisher.daemon = True
    subTopic2Publisher.name = SUB_TOPIC_B
    subTopic2Publisher.start()

    connection.subscribe(
        topic=SUB_TOPIC_A, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_heavy_callback
    )
    connection.subscribe(
        topic=SUB_TOPIC_B, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_light_callback
    )

    while True:
        time.sleep(10)


if __name__ == "__main__":
    main()

わかり辛いので、図にすると以下の通りです。

iot drawio

※ idはスレッドIDを示します。後述する実行ログと合わせて確認してください。

以下の状態を1つのコードで表しています。

  • 2つのスレッドが1秒間に1回topicにパブリッシュ
  • パブリッシュされた2つのトピックをそれぞれサブスクライブする
    • サブスクライブトピックsub/aのコールバック関数の処理時間は10秒
    • サブスクライブトピックsub/bのコールバック関数の処理時間は数ms

あくまで問題を再現させるためのコードです。実際には、パブリッシュする仕組みはクラウド側にあると思います。

実行結果のログは以下の通りです。

実行ログ
$ poetry run python3 ./src/mqtt-sub-callback.py
2022-10-09 19:53:26,194 __main__ [INFO] main (3069579328): start main thread
2022-10-09 19:53:26,195 __main__ [INFO] main (3069579328): start create mqtt connection
2022-10-09 19:53:28,141 __main__ [INFO] main (3069579328): end create mqtt connection
2022-10-09 19:53:28,142 __main__ [INFO] run (3038770240): publish sub/a 1665312808
2022-10-09 19:53:28,143 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:28,143 __main__ [INFO] run (3028284480): publish sub/b 1665312808
2022-10-09 19:53:28,144 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:28,233 __main__ [INFO] sub_heavy_callback (3059283008): call heavy func
2022-10-09 19:53:28,234 __main__ [INFO] sub_heavy_callback (3059283008): topic: sub/a, dup: False, qos: 0, retain: False
2022-10-09 19:53:28,234 __main__ [INFO] sub_heavy_callback (3059283008): payload: {"2407": 1665312808}
2022-10-09 19:53:29,144 __main__ [INFO] run (3038770240): publish sub/a 1665312809
2022-10-09 19:53:29,145 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:29,145 __main__ [INFO] run (3028284480): publish sub/b 1665312809
2022-10-09 19:53:29,146 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:30,146 __main__ [INFO] run (3038770240): publish sub/a 1665312810
2022-10-09 19:53:30,147 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:30,148 __main__ [INFO] run (3028284480): publish sub/b 1665312810
2022-10-09 19:53:30,148 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:31,149 __main__ [INFO] run (3038770240): publish sub/a 1665312811
2022-10-09 19:53:31,150 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:31,150 __main__ [INFO] run (3028284480): publish sub/b 1665312811
2022-10-09 19:53:31,151 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:32,151 __main__ [INFO] run (3038770240): publish sub/a 1665312812
2022-10-09 19:53:32,152 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:32,153 __main__ [INFO] run (3028284480): publish sub/b 1665312812
2022-10-09 19:53:32,153 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:33,153 __main__ [INFO] run (3038770240): publish sub/a 1665312813
2022-10-09 19:53:33,154 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:33,154 __main__ [INFO] run (3028284480): publish sub/b 1665312813
2022-10-09 19:53:33,155 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:34,156 __main__ [INFO] run (3038770240): publish sub/a 1665312814
2022-10-09 19:53:34,156 __main__ [INFO] run (3028284480): publish sub/b 1665312814
2022-10-09 19:53:34,157 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:34,157 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:35,159 __main__ [INFO] run (3038770240): publish sub/a 1665312815
2022-10-09 19:53:35,159 __main__ [INFO] run (3028284480): publish sub/b 1665312815
2022-10-09 19:53:35,160 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:35,160 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:36,162 __main__ [INFO] run (3028284480): publish sub/b 1665312816
2022-10-09 19:53:36,162 __main__ [INFO] run (3038770240): publish sub/a 1665312816
2022-10-09 19:53:36,163 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:36,163 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:37,165 __main__ [INFO] run (3028284480): publish sub/b 1665312817
2022-10-09 19:53:37,165 __main__ [INFO] run (3038770240): publish sub/a 1665312817
2022-10-09 19:53:37,166 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:37,166 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:38,168 __main__ [INFO] run (3028284480): publish sub/b 1665312818
2022-10-09 19:53:38,168 __main__ [INFO] run (3038770240): publish sub/a 1665312818
2022-10-09 19:53:38,169 __main__ [INFO] run (3028284480): publish done
2022-10-09 19:53:38,169 __main__ [INFO] run (3038770240): publish done
2022-10-09 19:53:38,237 __main__ [INFO] sub_heavy_callback (3059283008): end heavy func
2022-10-09 19:53:38,237 __main__ [INFO] sub_light_callback (3059283008): call light func
2022-10-09 19:53:38,238 __main__ [INFO] sub_light_callback (3059283008): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-09 19:53:38,238 __main__ [INFO] sub_light_callback (3059283008): payload: {"2408": 1665312808}
2022-10-09 19:53:38,238 __main__ [INFO] sub_light_callback (3059283008): end light func

ログを元に課題を整理します。

[課題1] コールバック関数実行中に別のサブスクライブトピックのコールバック関数実行がブロックされる

ログの通り、 sub/a トピックのコールバック関数実行中に sub/b のコールバック関数の実行がブロックされています。

sub/a sub/bトピックへほぼ同時にpublishしています。しかし、sub/aのコールバック関数(実行完了には10秒かかる)のみが実行され、sub/bのコールバック関数は実行されていません。 sub/bのコールバック関数が実行されるのは、sub/aのコールバック関数が終了してからです。

コールバック関数が実行されているスレッドIDが同じことから、内部的にはサブスクライブ時に受け取ったイベントをキューイングして同じスレッドを使い回し、シーケンシャルに実行していることが推測できます。

[課題2] コールバック関数の実行時間がkeep-aliveより長い場合、エッジ側が接続断として検知される

AWS IoTの設定でログを有効化すると(有効化方法は、記事の最後の項.付録を参照)、AWSIoTLogsV2にログが出力されます。

実行ログ
{
    "timestamp": "2022-10-09 10:53:27.884",
    "logLevel": "INFO",
    "traceId": "88fa4463-8119-a0af-13bc-376a26ee8e0d",
    "accountId": "",
    "status": "Success",
    "eventType": "Connect",
    "protocol": "MQTT",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052
}
{
    "timestamp": "2022-10-09 10:53:28.158",
    "logLevel": "INFO",
    "traceId": "560e8bfc-f6b8-567f-90c6-72d0f2bcc2d8",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-In",
    "protocol": "MQTT",
    "topicName": "sub/a",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052
}
{
    "timestamp": "2022-10-09 10:53:28.159",
    "logLevel": "INFO",
    "traceId": "758bdeca-2f08-428a-c444-a68bbdb0d313",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-Out",
    "protocol": "MQTT",
    "topicName": "sub/a",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052
}
{
    "timestamp": "2022-10-09 10:53:28.182",
    "logLevel": "INFO",
    "traceId": "da12bd57-d32e-9065-6980-c1792991a51a",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-In",
    "protocol": "MQTT",
    "topicName": "sub/b",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052
}
{
    "timestamp": "2022-10-09 10:53:28.183",
    "logLevel": "INFO",
    "traceId": "e4432668-b809-84c1-21d9-3cf24baa5a32",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-Out",
    "protocol": "MQTT",
    "topicName": "sub/b",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052
}
{
    "timestamp": "2022-10-09 10:53:28.233",
    "logLevel": "INFO",
    "traceId": "11d11c50-4237-5d0d-b776-3ab153870819",
    "accountId": "",
    "status": "Success",
    "eventType": "Subscribe",
    "protocol": "MQTT",
    "topicName": "sub/b",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052
}
{
    "timestamp": "2022-10-09 10:53:28.239",
    "logLevel": "INFO",
    "traceId": "e2acda51-fdba-ae22-edb1-5bbab217d1a9",
    "accountId": "",
    "status": "Success",
    "eventType": "Subscribe",
    "protocol": "MQTT",
    "topicName": "sub/a",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052
}
{
    "timestamp": "2022-10-09 10:53:35.703",
    "logLevel": "INFO",
    "traceId": "7cf5e46f-19e5-855e-1276-9da84eee31bc",
    "accountId": "",
    "status": "Success",
    "eventType": "Disconnect",
    "protocol": "MQTT",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43052,
    "disconnectReason": "MQTT_KEEP_ALIVE_TIMEOUT"
}
{
    "timestamp": "2022-10-09 10:53:38.538",
    "logLevel": "INFO",
    "traceId": "4e788e77-bce1-abf6-16c5-3e4ceb1febac",
    "accountId": "",
    "status": "Success",
    "eventType": "Connect",
    "protocol": "MQTT",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "xxx",
    "sourcePort": 43102
}
{
    "timestamp": "2022-10-09 10:53:39.183",
    "logLevel": "INFO",
    "traceId": "a389304e-4392-acef-3d9f-3c674115a836",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-In",
    "protocol": "MQTT",
    "topicName": "sub/b",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "123.198.93.31",
    "sourcePort": 43102
}
{
    "timestamp": "2022-10-09 10:53:39.184",
    "logLevel": "INFO",
    "traceId": "8d1d4b31-95ba-7671-d1c9-172acabf0ccb",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-Out",
    "protocol": "MQTT",
    "topicName": "sub/b",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "123.198.93.31",
    "sourcePort": 43102
}
{
    "timestamp": "2022-10-09 10:53:39.432",
    "logLevel": "INFO",
    "traceId": "89d9bc0c-2c64-d4d4-85dd-50c4fc060a0d",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-In",
    "protocol": "MQTT",
    "topicName": "sub/a",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "123.198.93.31",
    "sourcePort": 43102
}
{
    "timestamp": "2022-10-09 10:53:39.434",
    "logLevel": "INFO",
    "traceId": "148fe046-adb3-b2a5-4d4e-6d7c5b0c418f",
    "accountId": "",
    "status": "Success",
    "eventType": "Publish-Out",
    "protocol": "MQTT",
    "topicName": "sub/a",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "123.198.93.31",
    "sourcePort": 43102
}
{
    "timestamp": "2022-10-09 10:53:40.511",
    "logLevel": "INFO",
    "traceId": "dd7ec295-0a06-fb83-3c3a-d68309305036",
    "accountId": "",
    "status": "Success",
    "eventType": "Disconnect",
    "protocol": "MQTT",
    "clientId": "test-thing",
    "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925",
    "sourceIp": "123.198.93.31",
    "sourcePort": 43102,
    "disconnectReason": "CONNECTION_LOST"
}

ログを確認すると、以下のことが分かります。

コールバック関数実行中の2022-10-09 10:53:35.703Disconnect (MQTT_KEEP_ALIVE_TIMEOUT)しています。 前述のソースコードより、keep-aliveは5秒が設定されています。コールバック関数の実行に10秒かかる今回のケースでは、タイムアウトとして検知されます。実際トピックからメッセージを受けて7秒後に検知されていますが、これはポーリング間隔によるズレだと考えられます。 またコールバック関数の実行が完了した2022-10-09 10:53:38.538に再接続されています。

エッジ側の接続断を検知する仕組みを導入していた場合(AWSドキュメント|ライフサイクルイベント)、本ケースでも予約済みトピックに切断された対象デバイスとして通知されます。

この挙動を最初理解しておらず、エッジ側のネットワークは問題ないはずだが、断続的に接続断が発生するという事象が起きました。こちらもコールバック関数の実行方法を工夫することで対策が可能です。

解決策

表題の通り、サブスクライブ時に実行されるコールバック関数をすぐ終了させ、重たい処理は別スレッドで行います。課題2に関しては、keep-alive以内にコールバック関数が終了すれば解決できます。ただ課題1にアプローチするために以下の方法を利用します。

方法1

メモリが許す範囲で、パブリッシュされたイベントを全て捌きたい場合は、ワーカースレッドを使うのが良いと思います。

ソースコード
import utils.config as Config
import utils.logger as Logger
from awscrt import io, mqtt
from threading import Thread
import json
import time
from concurrent.futures import ThreadPoolExecutor

config = Config.getConfig()
Logger.init()

logger = Logger.getLogger(__name__)

AWS_IOT_ENDPOINT = config.IotEndpoint
# AWS IoTのサポートするkeep-alive 5 - 1200 s
KEEP_ALIVE_SECS = 5
# ms単位なので、KEEP_ALIVE_SECS * 1000 より短い必要がある
PING_TIMEOUT_MS = 1000
THING_NAME = "test-thing"

SUB_TOPIC_A = "sub/a"
SUB_TOPIC_B = "sub/b"


class PublishThread(Thread):
    def __init__(self, mqtt_connection=None, publish_topic_name=None):
        self.connection = mqtt_connection
        self.publish_topic_name = publish_topic_name
        Thread.__init__(self)

    def run(self):

        while True:
            timestamp = int(time.time())
            logger.info(f"publish {self.name} {timestamp}")
            self.connection.publish(
                topic=self.publish_topic_name,
                payload=json.dumps({f"{self.native_id}": timestamp}),
                qos=mqtt.QoS.AT_MOST_ONCE,
            )

            logger.info("publish done")

            time.sleep(1)


def createMQTTConnection(
    device_certificate_filepath: str,
    private_key_filepath: str,
    ca_certificate_filepath: str,
):
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(
        device_certificate_filepath, private_key_filepath
    )
    tls_ctx_options.override_default_trust_store_from_path(
        None, ca_certificate_filepath
    )
    tls_ctx = io.ClientTlsContext(tls_ctx_options)

    mqtt_client = mqtt.Client(client_bootstrap, tls_ctx)
    mqtt_connection = mqtt.Connection(
        client=mqtt_client,
        client_id=THING_NAME,
        host_name=AWS_IOT_ENDPOINT,
        port=8883,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE_SECS,
        ping_timeout_ms=PING_TIMEOUT_MS,
    )

    connect_future = mqtt_connection.connect()
    connect_future.result()

    return mqtt_connection


def sub_heavy_callback(topic, payload, dup, qos, retain):
    logger.info("call heavy func")
    logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}")
    logger.info(f"payload: {payload.decode('utf-8')}")
    time.sleep(10)
    logger.info("end heavy func")


def sub_light_callback(topic, payload, dup, qos, retain):
    logger.info("call light func")
    logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}")
    logger.info(f"payload: {payload.decode('utf-8')}")
    logger.info("end light func")


def main():
    logger.info("start main thread")
    logger.info("start create mqtt connection")
    connection = createMQTTConnection(
        device_certificate_filepath="./data/device_certificate.pem",
        private_key_filepath="./data/private.key",
        ca_certificate_filepath="./data/ca_certificate.pem",
    )
    logger.info("end create mqtt connection")

    MAX_WORKER = 3
    with ThreadPoolExecutor(
        max_workers=MAX_WORKER, thread_name_prefix="execSubCallbackThread"
    ) as executor:
        subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A)
        subTopic1Publisher.daemon = True
        subTopic1Publisher.name = SUB_TOPIC_A
        subTopic1Publisher.start()

        subTopic2Publisher = PublishThread(connection, SUB_TOPIC_B)
        subTopic2Publisher.daemon = True
        subTopic2Publisher.name = SUB_TOPIC_B
        subTopic2Publisher.start()

        def sub_a_callbcak(topic, payload, dup, qos, retain):
            executor.submit(sub_heavy_callback, topic, payload, dup, qos, retain)

        def sub_b_callbcak(topic, payload, dup, qos, retain):
            executor.submit(sub_light_callback, topic, payload, dup, qos, retain)

        connection.subscribe(
            topic=SUB_TOPIC_A, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_a_callbcak
        )
        connection.subscribe(
            topic=SUB_TOPIC_B, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_b_callbcak
        )

        while True:
            time.sleep(10)


if __name__ == "__main__":
    main()

当然ですが、指定したワーカースレッドの数を超えた並列数は実行できません。わかりやすいように3つワーカースレッドを立てた場合以下の通りです。

実行ログ
$ poetry run python3 ./src/mqtt-sub-callback-1.py
2022-10-10 11:25:06,299 __main__ [INFO] main [MainThread] (3069313088): start main thread
2022-10-10 11:25:06,299 __main__ [INFO] main [MainThread] (3069313088): start create mqtt connection
2022-10-10 11:25:06,618 __main__ [INFO] main [MainThread] (3069313088): end create mqtt connection
2022-10-10 11:25:06,619 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368706
2022-10-10 11:25:06,619 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:06,620 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368706
2022-10-10 11:25:06,621 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:06,655 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): call heavy func
2022-10-10 11:25:06,655 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): topic: sub/a, dup: False, qos: 0, retain: False
2022-10-10 11:25:06,655 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): payload: {"6556": 1665368706}
2022-10-10 11:25:06,681 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): call light func
2022-10-10 11:25:06,681 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-10 11:25:06,682 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): payload: {"6557": 1665368706}
2022-10-10 11:25:06,682 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): end light func
2022-10-10 11:25:07,621 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368707
2022-10-10 11:25:07,622 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:07,622 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368707
2022-10-10 11:25:07,623 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:07,645 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_1] (3006264384): call heavy func
2022-10-10 11:25:07,645 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_1] (3006264384): topic: sub/a, dup: False, qos: 0, retain: False
2022-10-10 11:25:07,646 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_1] (3006264384): payload: {"6556": 1665368707}
2022-10-10 11:25:07,673 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): call light func
2022-10-10 11:25:07,674 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-10 11:25:07,674 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): payload: {"6557": 1665368707}
2022-10-10 11:25:07,674 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): end light func
2022-10-10 11:25:08,623 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368708
2022-10-10 11:25:08,624 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:08,624 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368708
2022-10-10 11:25:08,625 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:08,653 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_2] (2995778624): call heavy func
2022-10-10 11:25:08,654 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_2] (2995778624): topic: sub/a, dup: False, qos: 0, retain: False
2022-10-10 11:25:08,654 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_2] (2995778624): payload: {"6556": 1665368708}
2022-10-10 11:25:09,625 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368709
2022-10-10 11:25:09,626 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:09,626 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368709
2022-10-10 11:25:09,627 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:10,627 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368710
2022-10-10 11:25:10,628 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368710
2022-10-10 11:25:10,628 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:10,629 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:11,631 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368711
2022-10-10 11:25:11,632 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368711
2022-10-10 11:25:11,633 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:11,634 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:12,635 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368712
2022-10-10 11:25:12,636 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368712
2022-10-10 11:25:12,636 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:12,637 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:13,639 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368713
2022-10-10 11:25:13,640 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368713
2022-10-10 11:25:13,640 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:13,641 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:14,642 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368714
2022-10-10 11:25:14,643 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368714
2022-10-10 11:25:14,643 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:14,644 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:15,646 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368715
2022-10-10 11:25:15,646 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368715
2022-10-10 11:25:15,646 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:15,647 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:16,649 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368716
2022-10-10 11:25:16,649 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368716
2022-10-10 11:25:16,650 __main__ [INFO] run [sub/a] (3037721664): publish done
2022-10-10 11:25:16,651 __main__ [INFO] run [sub/b] (3027235904): publish done
2022-10-10 11:25:16,657 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): end heavy func
2022-10-10 11:25:16,658 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): call light func
2022-10-10 11:25:16,658 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-10 11:25:16,659 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): payload: {"6557": 1665368708}
2022-10-10 11:25:16,659 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): end light func
2022-10-10 11:25:16,659 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): call heavy func
2022-10-10 11:25:16,659 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): topic: sub/a, dup: False, qos: 0, retain: False
2022-10-10 11:25:16,660 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): payload: {"6556": 1665368709}

sub/a sub/b 両方ともpublishされたメッセージを捌けていますが、3つのスレッドが使用中になると後続のコールバック関数の実行はブロックされます。 ワーカスレッドが空き次第、次のタスクがスケジュールされます。

方法2

前述のやり方ですと、負荷を読みきれなかったケースでキューが伸びてメモリ枯渇->最悪OOM Killしてしまうケースがあります。サブスクライブするトピック毎にスレッドを立てて、処理中は後続のイベントは捨てることが許容できれば以下のような実装でも良いと思います。

ソースコード
import json
import time
from threading import Thread

from awscrt import io, mqtt

import utils.config as Config
import utils.logger as Logger

config = Config.getConfig()
Logger.init()

logger = Logger.getLogger(__name__)

AWS_IOT_ENDPOINT = config.IotEndpoint
# AWS IoTのサポートするkeep-alive 5 - 1200 s
KEEP_ALIVE_SECS = 5
# ms単位なので、KEEP_ALIVE_SECS * 1000 より短い必要がある
PING_TIMEOUT_MS = 1000
THING_NAME = "test-thing"

SUB_TOPIC_A = "sub/a"
SUB_TOPIC_B = "sub/b"


class PublishThread(Thread):
    def __init__(self, mqtt_connection=None, publish_topic_name=None):
        self.connection = mqtt_connection
        self.publish_topic_name = publish_topic_name
        Thread.__init__(self)

    def run(self):

        while True:
            timestamp = int(time.time())
            logger.info(f"publish {self.name} {timestamp}")
            self.connection.publish(
                topic=self.publish_topic_name,
                payload=json.dumps({f"{self.native_id}": timestamp}),
                qos=mqtt.QoS.AT_MOST_ONCE,
            )

            logger.info("publish done")

            time.sleep(1)


def createMQTTConnection(
    device_certificate_filepath: str,
    private_key_filepath: str,
    ca_certificate_filepath: str,
):
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(
        device_certificate_filepath, private_key_filepath
    )
    tls_ctx_options.override_default_trust_store_from_path(
        None, ca_certificate_filepath
    )
    tls_ctx = io.ClientTlsContext(tls_ctx_options)

    mqtt_client = mqtt.Client(client_bootstrap, tls_ctx)
    mqtt_connection = mqtt.Connection(
        client=mqtt_client,
        client_id=THING_NAME,
        host_name=AWS_IOT_ENDPOINT,
        port=8883,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE_SECS,
        ping_timeout_ms=PING_TIMEOUT_MS,
    )

    connect_future = mqtt_connection.connect()
    connect_future.result()

    return mqtt_connection


def sub_heavy_callback(topic, payload, dup, qos, retain):
    logger.info("call heavy func")
    logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}")
    logger.info(f"payload: {payload.decode('utf-8')}")
    time.sleep(10)
    logger.info("end heavy func")


def sub_light_callback(topic, payload, dup, qos, retain):
    logger.info("call light func")
    logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}")
    logger.info(f"payload: {payload.decode('utf-8')}")
    logger.info("end light func")


class SubExecThread(Thread):
    def __init__(self, callback):
        self.__is_using = False
        self.__callback = callback
        self.__args = []

        Thread.__init__(self)

    def add_task(self, topic, payload, dup, qos, retain):
        if self.__is_using is False:
            self.__is_using = True
            self.__args = [topic, payload, dup, qos, retain]
        else:
            logger.info(
                f"skip event: topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}, payload: {payload.decode('utf-8')}"
            )

    def run(self):
        while True:
            if self.__is_using is True:
                logger.info(self.__args)
                self.__callback(
                    topic=self.__args[0],
                    payload=self.__args[1],
                    dup=self.__args[2],
                    qos=self.__args[3],
                    retain=self.__args[4],
                )
                self.__is_using = False

            time.sleep(0.1)


def main():
    logger.info("start main thread")
    logger.info("start create mqtt connection")
    connection = createMQTTConnection(
        device_certificate_filepath="./data/device_certificate.pem",
        private_key_filepath="./data/private.key",
        ca_certificate_filepath="./data/ca_certificate.pem",
    )
    logger.info("end create mqtt connection")

    subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A)
    subTopic1Publisher.daemon = True
    subTopic1Publisher.name = SUB_TOPIC_A
    subTopic1Publisher.start()

    subTopic2Publisher = PublishThread(connection, SUB_TOPIC_B)
    subTopic2Publisher.daemon = True
    subTopic2Publisher.name = SUB_TOPIC_B
    subTopic2Publisher.start()

    subaExecThread = SubExecThread(sub_heavy_callback)
    subaExecThread.daemon = True
    subaExecThread.name = "subaExecThread"
    subaExecThread.start()

    subbExecThread = SubExecThread(sub_light_callback)
    subbExecThread.daemon = True
    subbExecThread.name = "subbExecThread"
    subbExecThread.start()

    connection.subscribe(
        topic=SUB_TOPIC_A,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=subaExecThread.add_task,
    )
    connection.subscribe(
        topic=SUB_TOPIC_B,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=subbExecThread.add_task,
    )

    while True:
        time.sleep(10)


if __name__ == "__main__":
    main()

実行ログは以下の通りです。

実行ログ
$ poetry run python3 ./src/mqtt-sub-callback-2.py
2022-10-10 12:17:57,269 __main__ [INFO] main [MainThread] (3069907008): start main thread
2022-10-10 12:17:57,270 __main__ [INFO] main [MainThread] (3069907008): start create mqtt connection
2022-10-10 12:17:57,577 __main__ [INFO] main [MainThread] (3069907008): end create mqtt connection
2022-10-10 12:17:57,578 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371877
2022-10-10 12:17:57,579 __main__ [INFO] run [sub/a] (3038770240): publish done
2022-10-10 12:17:57,579 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371877
2022-10-10 12:17:57,581 __main__ [INFO] run [sub/b] (3028284480): publish done
2022-10-10 12:17:57,680 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): call heavy func
2022-10-10 12:17:57,681 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): topic: sub/a, dup: False, qos: 0, retain: False
2022-10-10 12:17:57,681 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): payload: {"8168": 1665371877}
2022-10-10 12:17:57,681 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func
2022-10-10 12:17:57,682 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-10 12:17:57,682 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371877}
2022-10-10 12:17:57,682 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func
2022-10-10 12:17:58,581 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371878
2022-10-10 12:17:58,581 __main__ [INFO] run [sub/a] (3038770240): publish done
2022-10-10 12:17:58,582 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371878
2022-10-10 12:17:58,582 __main__ [INFO] run [sub/b] (3028284480): publish done
2022-10-10 12:17:58,628 __main__ [INFO] add_task [Dummy-5] (3059610688): skip event: topic: sub/a, dup: False, qos: 0, retain: False, payload: {"8168": 1665371878}
2022-10-10 12:17:58,684 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func
2022-10-10 12:17:58,684 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-10 12:17:58,685 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371878}
2022-10-10 12:17:58,685 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func
2022-10-10 12:18:07,597 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371887
(中略)
2022-10-10 12:18:07,598 __main__ [INFO] run [sub/a] (3038770240): publish done
2022-10-10 12:18:07,599 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371887
2022-10-10 12:18:07,600 __main__ [INFO] run [sub/b] (3028284480): publish done
2022-10-10 12:18:07,635 __main__ [INFO] add_task [Dummy-5] (3059610688): skip event: topic: sub/a, dup: False, qos: 0, retain: False, payload: {"8168": 1665371887}
2022-10-10 12:18:07,681 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): end heavy func
2022-10-10 12:18:07,708 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func
2022-10-10 12:18:07,709 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-10 12:18:07,709 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371887}
2022-10-10 12:18:07,709 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func
2022-10-10 12:18:08,599 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371888
2022-10-10 12:18:08,600 __main__ [INFO] run [sub/a] (3038770240): publish done
2022-10-10 12:18:08,601 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371888
2022-10-10 12:18:08,602 __main__ [INFO] run [sub/b] (3028284480): publish done
2022-10-10 12:18:08,683 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): call heavy func
2022-10-10 12:18:08,684 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): topic: sub/a, dup: False, qos: 0, retain: False
2022-10-10 12:18:08,684 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): payload: {"8168": 1665371888}
2022-10-10 12:18:08,911 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func
2022-10-10 12:18:08,912 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False
2022-10-10 12:18:08,912 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371888}
2022-10-10 12:18:08,912 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func

本コードの場合、サブスクライブスレッド毎にコールバック関数の実行が分離されているため、sub/aのイベントが詰まっても sub/bのイベントを正常に処理できます。 また実行中はイベントを捨てることで、キューが伸びることもありません。

以上の通り、実現したい要件や利用できるメモリ量を考慮して方法を検討して見ると良いかもしれません。方法1と方法2の良いところを組み合わせて利用するのも良いと思います。

オフラインパブリッシュ対策をする(QoS=1の場合)

以下の条件でパブリッシュした場合、パブリッシュした内容を送信元でもキャンセルすることはできません。

  • オフライン状態(AWS IoTエンドポイントに到達できない状態を指す)
  • aws-iot-device-sdk-python-v2を利用
  • QoS=1でパブリッシュ

再接続時にオフライン時にパブリッシュした内容が一斉に再送されます。QoS=1で送信しているのでPUBACKが返却されるまで再送し続けるので当然かもしれませんが、送信側で滞留したメッセージの対策が必要です。

オフラインパブリッシュ仕様に関しては、issueとしてconfigure offline publishing in v2が起票されています。v1であるaws-iot-device-sdk-pythonでは詳細に設定できたことが読み取れます。オフラインパブリッシュ仕様に関してはv1のソースを読めば別の回避策が見つかるかもしれません。

事象/再現コード

コードは以下の通りです。

ソースコード
import json
import time
from threading import Thread
from concurrent import futures

from awscrt import io, mqtt

import utils.config as Config
import utils.logger as Logger

config = Config.getConfig()
Logger.init()

logger = Logger.getLogger(__name__)

AWS_IOT_ENDPOINT = config.IotEndpoint
KEEP_ALIVE_SECS = 5
PING_TIMEOUT_MS = 1000
THING_NAME = "test-thing"

SUB_TOPIC_A = "sub/a"


class PublishThread(Thread):
    def __init__(self, mqtt_connection=None, publish_topic_name=None):
        self.connection = mqtt_connection
        self.publish_topic_name = publish_topic_name
        Thread.__init__(self)

    def run(self):
        while True:
            try:
                timestamp = int(time.time())
                logger.info(f"publish {self.name} {timestamp}")
                publish_future, _ = self.connection.publish(
                    topic=self.publish_topic_name,
                    payload=json.dumps({f"{self.native_id}": timestamp}),
                    qos=mqtt.QoS.AT_LEAST_ONCE,
                )


                logger.info("publish done")

                time.sleep(1)
            except Exception as e:
                logger.info(f"publish error: {e}", exc_info=True)


def sub_callback(topic, payload, dup, qos, retain):
    logger.info("call sub_callback")
    logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}")
    logger.info(f"payload: {payload.decode('utf-8')}")
    logger.info("end sub_callback")


def createMQTTConnection(
    device_certificate_filepath: str,
    private_key_filepath: str,
    ca_certificate_filepath: str,
):
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(
        device_certificate_filepath, private_key_filepath
    )
    tls_ctx_options.override_default_trust_store_from_path(
        None, ca_certificate_filepath
    )
    tls_ctx = io.ClientTlsContext(tls_ctx_options)

    mqtt_client = mqtt.Client(client_bootstrap, tls_ctx)
    mqtt_connection = mqtt.Connection(
        client=mqtt_client,
        client_id=THING_NAME,
        host_name=AWS_IOT_ENDPOINT,
        port=8883,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE_SECS,
        ping_timeout_ms=PING_TIMEOUT_MS,
    )

    connect_future = mqtt_connection.connect()
    connect_future.result()

    return mqtt_connection


def main():
    logger.info("start main thread")
    logger.info("start create mqtt connection")
    connection = createMQTTConnection(
        device_certificate_filepath="./data/device_certificate.pem",
        private_key_filepath="./data/private.key",
        ca_certificate_filepath="./data/ca_certificate.pem",
    )
    logger.info("end create mqtt connection")

    subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A)
    subTopic1Publisher.daemon = True
    subTopic1Publisher.name = SUB_TOPIC_A
    subTopic1Publisher.start()

    connection.subscribe(
        topic=SUB_TOPIC_A, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_callback
    )

    while True:
        time.sleep(10)


if __name__ == "__main__":
    main()

ネットワークをOFFしタイムスタンプを出力するコマンドは以下の通りです。

# Raspberry Pi OSの場合
sudo ifconfig wlan0 down;date +%s
# Macの場合
networksetup -SetAirportPower en0 off;date +%s
実行ログ

wifiをOFF->ONしたタイムスタンプ

$ networksetup -SetAirportPower en0 off;date +%s
1665442923

$ networksetup -SetAirportPower en0 on;date +%s
1665442931

ソースのログ

$ poetry run python3 ./src/offline-publish.py
2022-10-11 08:02:01,255 __main__ [INFO] main [MainThread] (4335748480): start main thread
2022-10-11 08:02:01,255 __main__ [INFO] main [MainThread] (4335748480): start create mqtt connection
2022-10-11 08:02:01,583 __main__ [INFO] main [MainThread] (4335748480): end create mqtt connection
2022-10-11 08:02:01,583 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442921
2022-10-11 08:02:01,584 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442921}
2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:02,589 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442922
2022-10-11 08:02:02,589 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442922}
2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:03,594 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442923
2022-10-11 08:02:03,595 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:04,599 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442924
2022-10-11 08:02:04,599 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:05,600 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442925
2022-10-11 08:02:05,600 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:06,603 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442926
2022-10-11 08:02:06,603 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:07,609 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442927
2022-10-11 08:02:07,609 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:08,613 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442928
2022-10-11 08:02:08,613 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:09,618 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442929
2022-10-11 08:02:09,619 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:10,624 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442930
2022-10-11 08:02:10,624 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:11,629 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442931
2022-10-11 08:02:11,629 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:12,635 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442932
2022-10-11 08:02:12,635 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442931}
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442924}
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442932}
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442928}
2022-10-11 08:02:13,020 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442926}
2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442925}
2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442930}
2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,029 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,029 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,029 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442923}
2022-10-11 08:02:13,030 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442927}
2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,033 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,034 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,034 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442929}
2022-10-11 08:02:13,034 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:13,640 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442933
2022-10-11 08:02:13,641 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442933}
2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:14,646 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442934
2022-10-11 08:02:14,646 __main__ [INFO] run [sub/a] (6153482240): publish done
2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback
2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442934}
2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback
2022-10-11 08:02:15,649 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442935
2022-10-11 08:02:15,650 __main__ [INFO] run [sub/a] (6153482240): publish done

ログより、ネットワークが復旧したと思われる08:02:13,019にオフライン時にパブリッシュした1665442923から1665442932のペイロードがほぼ同時に順不同でパブリッシュされていることが分かります。 これは一定時間大量のエッジ側でQoS=1でオフラインパブリッシュが発生した場合、再接続時にクラウド側のリソースのスロットリングが発生する可能性があり、対策が必要です。

解決策

解決策としては以下の通りです。

  • QoS=0で送信する
  • (負荷が許容できる場合)送信元でパブリッシュするメッセージにタイムスタンプをつけて、送信先(クラウド側)でタイムスタンプを検証する

エッジ側でMQTT接続異常を検知できるようにする

要件としてはネットワークに異常があった場合、デバイス側からエンドユーザーになんらかの処理を実行したい場合です。クラウド側で検知出来ても、ネットワーク異常時にクラウド側からデバイスに処理内容を送れません。

パブリッシュ時に検知したい場合

QoS=1の場合、PUBACKが返却されたかどうかを送信後一定期間待つことで確認可能です。awscrtの場合、以下のコードで実現可能です。ソースコード全文はshuntaka9576/reterminal-sampleを参考にしてください。注意点として、前項で記載した通り、タイムアウトしてもオンラインに戻った時点で再送されます。

try:
    timestamp = int(time.time())
    logger.info(f"publish {self.name} {timestamp}")
    publish_future, _ = self.connection.publish(
        topic=self.publish_topic_name,
        payload=json.dumps({f"{self.native_id}": timestamp}),
        qos=mqtt.QoS.AT_LEAST_ONCE,
    )

    # PUBACKが返却されるまで待機
    publish_future.result(timeout=10)

    logger.info("publish done")

    time.sleep(1)
except futures._base.TimeoutError as e:
    # 確認できない場合エラーをスロー
    logger.info(f"futures timeout error: {e}", exc_info=True)

    # cancel不可(オンラインに戻った時点で再送される)
    # publish_future.cancel()
except Exception as e:
    logger.info(f"publish error: {e}", exc_info=True)

実行すると以下の通りです。10秒たった時点でエラーがスローされていることが確認できます。本エラーをハンドリングすることでエンドユーザーにネットワークエラーを知らせることが可能です。

$ poetry run python3 ./src/offline-publish-1.py
2022-10-11 08:18:31,410 __main__ [INFO] main [MainThread] (4341024128): start main thread
2022-10-11 08:18:31,410 __main__ [INFO] main [MainThread] (4341024128): start create mqtt connection
2022-10-11 08:18:31,713 __main__ [INFO] main [MainThread] (4341024128): end create mqtt connection
2022-10-11 08:18:31,713 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443911
2022-10-11 08:18:31,750 __main__ [INFO] run [sub/a] (6146224128): publish done
2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): call sub_callback
2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): payload: {"2028645": 1665443911}
2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): end sub_callback
2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): call sub_callback
2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): topic: sub/a, dup: False, qos: 1, retain: False
2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): payload: {"2000558": 1665442936}
2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): end sub_callback
(中略)
2022-10-11 08:18:36,932 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443916
2022-10-11 08:18:46,938 __main__ [INFO] run [sub/a] (6146224128): futures timeout error:
Traceback (most recent call last):
  File "/Users/shuntaka/repos/github.com/shuntaka9576/reterminal-sample/./src/offline-publish-1.py", line 41, in run
    publish_future.result(timeout=10)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 448, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError
2022-10-11 08:18:46,941 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443926
2022-10-11 08:18:56,942 __main__ [INFO] run [sub/a] (6146224128): futures timeout error:
Traceback (most recent call last):
  File "/Users/shuntaka/repos/github.com/shuntaka9576/reterminal-sample/./src/offline-publish-1.py", line 41, in run
    publish_future.result(timeout=10)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 448, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError
2022-10-11 08:18:56,943 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443936
2022-10-11 08:19:06,948 __main__ [INFO] run [sub/a] (6146224128): futures timeout error:
Traceback (most recent call last):
  File "/Users/shuntaka/repos/github.com/shuntaka9576/reterminal-sample/./src/offline-publish-1.py", line 41, in run
    publish_future.result(timeout=10)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 448, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError
2022-10-11 08:19:06,949 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443946

恒常的に検知したい場合

様々な実装方法がありますが、比較的簡易なものを紹介します。正直あまり筋が良いとは思ってないため、もっと良い方法があればFB頂けるとありがたいです。

  • ヘルスチェック用にトピックを定義(ヘルスチェックトピックとします)し、デバイスがヘルスチェックトピックへ一定間隔でタイムスタンプをつけたペイロードをパブリッシュ
  • 同時にデバイスは、ヘルスチェックトピックをサブスクライブし、パブリッシュした値と同じであることを確認

今回はMQTTを利用しますが、アプリケーション層より低レイヤーのネットワーク階層のプロトコルを使っても良いと思います。

ソースコード
import threading
import json
import time
from enum import Enum

from awscrt import io, mqtt

import utils.config as Config
import utils.logger as Logger

config = Config.getConfig()
Logger.init()

logger = Logger.getLogger(__name__)

AWS_IOT_ENDPOINT = config.IotEndpoint
KEEP_ALIVE_SECS = 5
PING_TIMEOUT_MS = 1000
THING_NAME = "test-thing"
HEALTH_CHECK_TOPIC_NAME = "sub/a"

TIME_OUT_SEC = 3


class HealthCheckStatus(Enum):
    NOT_INIT = "NOT_INIT"
    CONNECTED = "CONNECT"
    DISCONNECT = "DISCONNECT"


class HealthCheckTopic:
    def __init__(self, connection, thing_name: str):
        self.mqtt_connection = connection
        self.thing_name = thing_name

    def publish(self, data):
        future, packet_id = self.mqtt_connection.publish(
            topic=HEALTH_CHECK_TOPIC_NAME,
            payload=json.dumps(data),
            qos=mqtt.QoS.AT_MOST_ONCE,
        )

        return future, packet_id

    def subscribe(self, callback=None):
        self.mqtt_connection.subscribe(
            # publishしたことを確認するために利用、publishとtopicが同じ
            topic=HEALTH_CHECK_TOPIC_NAME,
            qos=mqtt.QoS.AT_MOST_ONCE,
            callback=callback,
        )


class HealthCheckUseCase:
    def __init__(self, mqtt_connection, thing_name) -> None:
        self.subscribe_notify_timestamp = None
        self.__health_check_topic = HealthCheckTopic(mqtt_connection, thing_name)

    def set_timestamp_for_subscriber(self, topic, payload, dup, qos, retain):
        payload_obj = json.loads(payload.decode("utf-8"))
        timestamp = payload_obj.get("timestamp")
        if timestamp is not None:
            self.subscribe_notify_timestamp = timestamp

    def get_status(
        self,
        time_out_sec: int,
    ):
        try:
            self.subscribe_notify_timestamp = None

            timestamp = int(time.time())

            future = self.__health_check_topic.publish(
                {"timestamp": timestamp},
            )

            retry_count = 0


            # time_out_secに指定した秒間ヘルスチェックトピックからメッセージが来ていないかを確認する
            while True:
                # ヘルスチェックトピックへパブリッシュした値とサブスクライブで到着した値が一致すれば接続中と判定
                if timestamp == self.subscribe_notify_timestamp:
                    return HealthCheckStatus.CONNECTED

                retry_count = retry_count + 1
                if retry_count == time_out_sec:
                    logger.info(
                        f"health check timeout: "
                        f"timestamp: {timestamp}, "
                        f"subscribe_notify_timestamp: {self.subscribe_notify_timestamp}"
                    )
                    future.cancel()

                    return HealthCheckStatus.DISCONNECT

                time.sleep(1)

        except Exception as e:
            return HealthCheckStatus.DISCONNECT


class HealthCheckThread(threading.Thread):
    def __init__(
        self,
        mqtt_connection: mqtt.Connection,
        thing_name: str,
    ):
        threading.Thread.__init__(self)

        self.__status = HealthCheckStatus.NOT_INIT
        self.__mqtt_connection = mqtt_connection
        self.__thing_name = thing_name

    def run(self):
        health_check_use_case = HealthCheckUseCase(
            self.__mqtt_connection, self.__thing_name
        )
        health_check_topic = HealthCheckTopic(self.__mqtt_connection, self.__thing_name)
        health_check_topic.subscribe(health_check_use_case.set_timestamp_for_subscriber)

        while True:
            try:
                self.__status = health_check_use_case.get_status(
                    TIME_OUT_SEC,
                )
                logger.info(f"status: {self.__status}")
            except Exception:
                logger.error("Unknown error", exc_info=True)

            # チェック間隔を指定
            time.sleep(1)


def createMQTTConnection(
    device_certificate_filepath: str,
    private_key_filepath: str,
    ca_certificate_filepath: str,
):
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(
        device_certificate_filepath, private_key_filepath
    )
    tls_ctx_options.override_default_trust_store_from_path(
        None, ca_certificate_filepath
    )
    tls_ctx = io.ClientTlsContext(tls_ctx_options)

    mqtt_client = mqtt.Client(client_bootstrap, tls_ctx)
    mqtt_connection = mqtt.Connection(
        client=mqtt_client,
        client_id=THING_NAME,
        host_name=AWS_IOT_ENDPOINT,
        port=8883,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE_SECS,
        ping_timeout_ms=PING_TIMEOUT_MS,
    )

    connect_future = mqtt_connection.connect()
    connect_future.result()

    return mqtt_connection


def main():
    logger.info("start main thread")
    logger.info("start create mqtt connection")
    connection = createMQTTConnection(
        device_certificate_filepath="./data/device_certificate.pem",
        private_key_filepath="./data/private.key",
        ca_certificate_filepath="./data/ca_certificate.pem",
    )
    logger.info("end create mqtt connection")

    health_check_thread = HealthCheckThread(mqtt_connection=connection, thing_name=THING_NAME)
    health_check_thread.daemon = True
    health_check_thread.name = "health_check_thread"
    health_check_thread.start()

    while True:
        time.sleep(10)


if __name__ == "__main__":
    main()
実行ログ

ネットワークをOFF->ONしたタイムスタンプ

$ networksetup -SetAirportPower en0 off;date +%s
1665448588
$ networksetup -SetAirportPower en0 on;date +%s
1665448592

アプリ実行ログ

$ poetry run python3 ./src/health-check.py
2022-10-11 09:36:22,063 __main__ [INFO] main [MainThread] (4345300352): start main thread
2022-10-11 09:36:22,063 __main__ [INFO] main [MainThread] (4345300352): start create mqtt connection
2022-10-11 09:36:22,435 __main__ [INFO] main [MainThread] (4345300352): end create mqtt connection
2022-10-11 09:36:23,441 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED
2022-10-11 09:36:25,442 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED
2022-10-11 09:36:27,453 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED
2022-10-11 09:36:30,468 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448588, subscribe_notify_timestamp: None
2022-10-11 09:36:30,469 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT
2022-10-11 09:36:33,478 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448591, subscribe_notify_timestamp: None
2022-10-11 09:36:33,478 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT
2022-10-11 09:36:36,487 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448594, subscribe_notify_timestamp: None
2022-10-11 09:36:36,487 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT
2022-10-11 09:36:39,498 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448597, subscribe_notify_timestamp: None
2022-10-11 09:36:39,498 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT
2022-10-11 09:36:41,508 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED
2022-10-11 09:36:43,514 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED
2022-10-11 09:36:45,524 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED

ネットワークをOFFにした1665448588(2022 09:36:28)の2秒後に切断を検知、ネットワークをONにした1665448592(2022 09:36:32)から大体7秒後に復帰を検知しています。

IoTポリシーサイズ上限に注意する

ポリシードキュメントの最大サイズはスペースを除く2048文字です。(参考)こちらは上限緩和できるクォータではありません。

ポリシーは証明書に複数アタッチ可能で、役割ごとに余裕を持たせたIoTポリシーを複数アタッチするのが良いと思われます。

既に大量に証明書がある際に上限を超えた場合、以下の対応が必要なので認識しておくだけでも違うと思います。

  • 既存証明書へポリシー追加アタッチするスクリプトの作成、実行
  • 証明書作成時に新しいポリシーを追加アタッチする実装の追加

その他

その他機能面でやって良かったことを書きます。中には初歩的なこともありますが、改めて見て頂けると幸いです。

ログをCloudWatch Logsと同期する

クラウド側からエッジ側のログを確認できるとデバイスの状況を確認する上で便利でした。

デバイスのログは、watchtowerを利用して、クラウド側に保存しました。デバイス証明書を利用し、発行したAWS認証情報を利用して送信を行いました。(参考:AWS IoT Core認証情報プロバイダーを使用して、AWSサービスの直接呼出しを認証)。認証情報プロバイダーを利用した場合、クレデンシャルの期限は1時間であるため、1 時間ごとにローテートする必要があります。

ただ本方法はエッジ側がオフライン->オンラインになった場合、送信していなかった箇所を再送してくれる訳ではないです。故に参考程度です。トラブルシュートが必要な場合は、Soracom Napter経由でログを吸い上げて確認しています。

スレッドを監視する

ハードウェア側から割り込みで処理が必要になるケースがあり、常駐で起動しているスレッドを使う場面がありました。ただそのスレッドが意図せず停止しているケースがあったため監視をし、停止してはいけないスレッドが停止している場合異常終了するようにしました。systemdで再起動するようにOS側で設定しているため、サービスを再起動して復帰するという流れになります。

以下のような関数を定期的に実行すると良いと思います。ただソフトの開発ある程度進んでから、このエラーは全く見なくなりました。

for thread in threads:
    # スレッドの生存を確認
    if not thread.is_alive():
        thread_name = thread.getName()

        logger.error(f"{thread_name} is not alive.")

        raise # 独自エラーをraizeする

リモートでアップデート可能にしておく

スケジュール通り作り込めないケースがほとんどだと思います。なるべく早い段階でアプリの処理初めの方にアップデートする機構を作っておくと後々機能改善がスムーズになります。アップデートにはいくつか方法がありますが、例えばIoT Jobsだったり、最悪Soracom Napterで直接SSHできるようにしておくと現場に行かずにトラブルシュートが可能で便利だと思います。

アプリで時刻を扱う場合、時刻同期タイミングをチェックする

Raspberry PiにはRTCを搭載していないため、シャットダウンした時刻から次の処理が始まります。依存関係なくsystemdで起動登録している場合は、同期される前にほぼアプリケーションが始まると思って良いと思います。

時刻同期していない状態でアプリを動作すると、当たり前ですが以下のような問題が発生します

  • 時刻を検証の要素としているトークン検証に失敗する(AWS Signature Version 4, JWT など)
  • 時刻同期された時点で、ログに出力されている時刻が飛ぶため信頼性が下がる

アプリ側で時刻が同期されるまで待機する場合コードは以下の通りです。timedatectl というCLIを利用して時刻同期の状態を逐次取得する関数を定義し、同期されるまで後続処理をブロックするのが良いと思います。

import re
import subprocess

SYNCHRONIZED_STATUS_PATTERN = ".*?System clock synchronized: (.*?)\n"

def is_time_sync() -> bool:
    execTimedatectlResult = subprocess.run(
        ["timedatectl", "status"], encoding="utf-8", stdout=subprocess.PIPE
    )
    regexpResult = re.match(
        SYNCHRONIZED_STATUS_PATTERN, execTimedatectlResult.stdout, re.S
    )

    if regexpResult is not None:
        synchronizedStatus = regexpResult.group(1)

        if synchronizedStatus == "yes":
            return True
        else:
            return False

ログをローテートする

ストレージには限りがありますのでローテートしましょう。Pythonのデフォルトのloggingパッケージで可能です。

異常時にリソースの解放を行う

GPIOやソケット通信のコネクションは、終了シグナルを受け取ったり異常終了した際に、必ず解放するようにします。解放すること異常終了後のサービス再起動時に正常稼働できる可能性が高まります。

最終的には本番と同様の起動方法でテストする

systemd経由だと他のサービスの起動状況によって、接続されているハードウェア設定の初期化状態(QRリーダーのキーボード設定など)が変わる場合があります。早い段階でsystemdで起動した状態でテストできれば、その分バグが見つかるのは早くなります。

事前にやっておくとよかったこと

利用するハードウェアのライブラリが用意されているかを早めに確認する

ハードウェアと連携する場合、利用したいライブラリがCしかないケースは多々あります。 Cに精通しているメンバー入れば良いですが、いない場合はソフトを書く言語がPythonの場合、PythonのCバインディングライブラリがないかなど事前に確認しておくとスケジュールが読みやすくなると思います。

最後に

本記事ではRaspberry Pi OS上で動作するIoTエッジ開発でやって良かったことを書きました。当然ですがエッジデバイスをサーバーに見立てたら、EC2やECSで動作するサーバーサイドアプリ同様で、気にする観点として共通しているものも多かったと思います。 1つ1つもう少し掘りさげられるともっと良い解決策が見つかったかもしれません。より良い方法があればフィードバック頂けると幸いです。

付録

サンプルコードの設定方法

コンフィグ設定

cp ./src/utils/_config.py ./src/utils/config.py

IoT Coreのatsエンドポイントを設定

def getConfig():
    config = type(
        "Config",
        (object,),
        {
            # FQDNのみでOK
            "IotEndpoint": "xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com",
        },
    )

    return config

依存ライブラリのインストール

make install

# or
poetry install

MQTT接続に必要なIoT Coreリソース作成

one-click証明書を利用します

wget https://www.amazontrust.com/repository/AmazonRootCA1.pem
mv ./AmazonRootCA1.pem ./data/ca_certificate.pem
aws iot create-keys-and-certificate \
  --set-as-active \
  --certificate-pem-outfile ./data/device_certificate.pem \
  --public-key-outfile ./data/public.key \
  --private-key-outfile ./data/private.key >./data/cert.json
export POLICY_NAME="test-policy"
export THING_NAME="test-thing"

# ポリシーを作成
aws iot create-policy \
  --policy-name test-policy \
  --policy-document file://iot-policy.json

# 証明書にポリシーをアタッチ
aws iot attach-policy \
  --policy-name $POLICY_NAME \
  --target `cat ./data/cert.json | jq -r ".certificateArn"`

# モノの作成
aws iot create-thing \
  --thing-name $THING_NAME

# モノと証明書をアタッチ
aws iot attach-thing-principal \
  --thing-name $THING_NAME \
  --principal `cat ./data/cert.json | jq -r ".certificateArn"`

AWS IoTのログ有効化方法

スクリーンショット 2022-10-10 10 23 51

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.